---
title: Channels and Select Deep Dive
---

Channels and the `select` statement are the heart of Go's concurrency model. While the previous module covered the basics, this module explores advanced patterns and techniques that make Go's concurrency model so powerful and expressive.

## Channel Directionality

Go channels can be directional, allowing you to specify whether a channel is for sending, receiving, or both. This provides compile-time safety and makes code intentions clearer.

<UniversalEditor title="Channel Directionality" compare={true}>
```javascript !! js
// JavaScript: No direct equivalent, but can simulate with different interfaces
class ReadOnlyChannel {
  constructor(channel) {
    this.channel = channel;
  }
  
  receive() {
    return this.channel.receive();
  }
}

class WriteOnlyChannel {
  constructor(channel) {
    this.channel = channel;
  }
  
  send(data) {
    return this.channel.send(data);
  }
}

// Usage
const baseChannel = new EventChannel();
const readOnly = new ReadOnlyChannel(baseChannel);
const writeOnly = new WriteOnlyChannel(baseChannel);
```

```go !! go
// Go: Channel directionality
package main

import (
	"fmt"
	"time"
)

// sendOnly: can only send to the channel
func sender(ch chan<- string) {
	ch <- "Hello from sender"
	close(ch) // Can close the channel
}

// receiveOnly: can only receive from the channel
func receiver(ch <-chan string) {
	msg := <-ch
	fmt.Println("Received:", msg)
	// Cannot send: ch <- "test" // Compile error
	// Cannot close: close(ch) // Compile error
}

// bidirectional: can both send and receive
func processor(ch chan string) {
	msg := <-ch // Can receive
	ch <- "Processed: " + msg // Can send
}

func main() {
	// Bidirectional channel
	ch := make(chan string)
	
	// Pass to functions with specific directionality
	go sender(ch)
	receiver(ch)
	
	// Create a new channel for processing
	procCh := make(chan string)
	go processor(procCh)
	procCh <- "test message"
	result := <-procCh
	fmt.Println("Result:", result)
}
```
</UniversalEditor>

## Advanced Select Patterns

### Select with Default

The `default` case in `select` allows for non-blocking channel operations:

<UniversalEditor title="Select with Default" compare={true}>
```javascript !! js
// JavaScript: Simulating non-blocking operations
class NonBlockingChannel {
  constructor() {
    this.buffer = [];
    this.listeners = [];
  }
  
  send(data) {
    if (this.listeners.length > 0) {
      const listener = this.listeners.shift();
      listener(data);
      return true;
    } else {
      this.buffer.push(data);
      return false;
    }
  }
  
  receive() {
    if (this.buffer.length > 0) {
      return this.buffer.shift();
    } else {
      return null; // Non-blocking
    }
  }
  
  receiveAsync(callback) {
    if (this.buffer.length > 0) {
      callback(this.buffer.shift());
    } else {
      this.listeners.push(callback);
    }
  }
}

// Usage
const ch = new NonBlockingChannel();

// Non-blocking send
const sent = ch.send("hello");
console.log("Sent:", sent);

// Non-blocking receive
const received = ch.receive();
console.log("Received:", received);
```

```go !! go
// Go: Select with default for non-blocking operations
package main

import (
	"fmt"
	"time"
)

func nonBlockingSend(ch chan<- string, value string) bool {
	select {
	case ch <- value:
		return true // Successfully sent
	default:
		return false // Channel is full or no receivers
	}
}

func nonBlockingReceive(ch <-chan string) (string, bool) {
	select {
	case value := <-ch:
		return value, true // Successfully received
	default:
		return "", false // Channel is empty or no senders
	}
}

func main() {
	// Buffered channel
	ch := make(chan string, 2)
	
	// Non-blocking send
	if sent := nonBlockingSend(ch, "first"); sent {
		fmt.Println("Successfully sent first")
	}
	
	if sent := nonBlockingSend(ch, "second"); sent {
		fmt.Println("Successfully sent second")
	}
	
	// Try to send to full channel (non-blocking)
	if sent := nonBlockingSend(ch, "third"); !sent {
		fmt.Println("Failed to send third - channel full")
	}
	
	// Non-blocking receive
	if value, received := nonBlockingReceive(ch); received {
		fmt.Println("Received:", value)
	}
	
	// Try to receive from empty channel (non-blocking)
	if value, received := nonBlockingReceive(ch); !received {
		fmt.Println("No data available")
	} else {
		fmt.Println("Received:", value)
	}
}
```
</UniversalEditor>

### Select with Timeout

Using `select` with `time.After` provides timeout functionality:

<UniversalEditor title="Select with Timeout" compare={true}>
```javascript !! js
// JavaScript: Promise with timeout
function withTimeout(promise, timeoutMs) {
  const timeoutPromise = new Promise((_, reject) => {
    setTimeout(() => reject(new Error('Timeout')), timeoutMs);
  });
  
  return Promise.race([promise, timeoutPromise]);
}

// Usage
async function fetchWithTimeout(url, timeoutMs) {
  try {
    const response = await withTimeout(fetch(url), timeoutMs);
    return await response.text();
  } catch (error) {
    if (error.message === 'Timeout') {
      console.log('Request timed out');
    } else {
      console.log('Request failed:', error.message);
    }
  }
}

// Simulate async operation
const slowOperation = new Promise(resolve => {
  setTimeout(() => resolve('Operation completed'), 3000);
});

withTimeout(slowOperation, 1000)
  .then(result => console.log(result))
  .catch(error => console.log('Error:', error.message));
```

```go !! go
// Go: Select with timeout
package main

import (
	"fmt"
	"time"
)

func operationWithTimeout(operation func() string, timeout time.Duration) (string, error) {
	// Create a channel for the result
	resultCh := make(chan string, 1)
	
	// Run the operation in a goroutine
	go func() {
		result := operation()
		resultCh <- result
	}()
	
	// Wait for either result or timeout
	select {
	case result := <-resultCh:
		return result, nil
	case <-time.After(timeout):
		return "", fmt.Errorf("operation timed out after %v", timeout)
	}
}

func slowOperation() string {
	time.Sleep(3 * time.Second)
	return "Operation completed"
}

func main() {
	// Try with short timeout
	result, err := operationWithTimeout(slowOperation, 1*time.Second)
	if err != nil {
		fmt.Println("Error:", err)
	} else {
		fmt.Println("Result:", result)
	}
	
	// Try with longer timeout
	result, err = operationWithTimeout(slowOperation, 5*time.Second)
	if err != nil {
		fmt.Println("Error:", err)
	} else {
		fmt.Println("Result:", result)
	}
}
```
</UniversalEditor>

## Channel Patterns

### Fan-Out Pattern

The fan-out pattern distributes work from one channel to multiple workers:

<UniversalEditor title="Fan-Out Pattern" compare={true}>
```javascript !! js
// JavaScript: Fan-out with Promise.all
class FanOut {
  constructor(workerCount) {
    this.workerCount = workerCount;
  }
  
  async process(tasks) {
    const workers = [];
    
    // Create workers
    for (let i = 0; i < this.workerCount; i++) {
      workers.push(this.createWorker(i));
    }
    
    // Distribute tasks among workers
    const promises = [];
    for (let i = 0; i < tasks.length; i++) {
      const workerIndex = i % this.workerCount;
      promises.push(workers[workerIndex](tasks[i]));
    }
    
    return Promise.all(promises);
  }
  
  createWorker(id) {
    return async (task) => {
      // Simulate work
      await new Promise(resolve => setTimeout(resolve, Math.random() * 1000));
      return `Worker ${id} processed: ${task}`;
    };
  }
}

// Usage
const fanOut = new FanOut(3);
const tasks = Array.from({length: 10}, (_, i) => `Task ${i}`);
fanOut.process(tasks).then(results => {
  console.log("All tasks completed:", results);
});
```

```go !! go
// Go: Fan-out pattern
package main

import (
	"fmt"
	"math/rand"
	"sync"
	"time"
)

func fanOut(input <-chan string, workerCount int) []<-chan string {
	outputs := make([]chan string, workerCount)
	
	// Create output channels
	for i := 0; i < workerCount; i++ {
		outputs[i] = make(chan string)
	}
	
	// Start workers
	var wg sync.WaitGroup
	for i := 0; i < workerCount; i++ {
		wg.Add(1)
		go worker(i, input, outputs[i], &wg)
	}
	
	// Close output channels when all workers finish
	go func() {
		wg.Wait()
		for _, output := range outputs {
			close(output)
		}
	}()
	
	// Convert to receive-only channels
	result := make([]<-chan string, workerCount)
	for i, output := range outputs {
		result[i] = output
	}
	
	return result
}

func worker(id int, input <-chan string, output chan<- string, wg *sync.WaitGroup) {
	defer wg.Done()
	
	for task := range input {
		// Simulate work
		time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
		output <- fmt.Sprintf("Worker %d processed: %s", id, task)
	}
}

func main() {
	// Create input channel
	input := make(chan string)
	
	// Start fan-out
	outputs := fanOut(input, 3)
	
	// Send tasks
	go func() {
		defer close(input)
		for i := 0; i < 10; i++ {
			input <- fmt.Sprintf("Task %d", i)
		}
	}()
	
	// Collect results from all workers
	var results []string
	for _, output := range outputs {
		for result := range output {
			results = append(results, result)
		}
	}
	
	fmt.Println("All tasks completed:", results)
}
```
</UniversalEditor>

### Fan-In Pattern

The fan-in pattern combines multiple channels into one:

<UniversalEditor title="Fan-In Pattern" compare={true}>
```javascript !! js
// JavaScript: Fan-in with Promise.race or Promise.all
class FanIn {
  constructor() {
    this.output = [];
  }
  
  async collect(promises) {
    // Collect all results as they complete
    const results = [];
    
    // Create a promise for each input
    const inputPromises = promises.map((promise, index) => 
      promise.then(result => ({ index, result }))
    );
    
    // Collect results as they arrive
    while (inputPromises.length > 0) {
      const { index, result } = await Promise.race(inputPromises);
      results.push(result);
      inputPromises.splice(index, 1);
    }
    
    return results;
  }
}

// Usage
const fanIn = new FanIn();
const promises = [
  new Promise(resolve => setTimeout(() => resolve("Result 1"), 1000)),
  new Promise(resolve => setTimeout(() => resolve("Result 2"), 500)),
  new Promise(resolve => setTimeout(() => resolve("Result 3"), 1500))
];

fanIn.collect(promises).then(results => {
  console.log("All results:", results);
});
```

```go !! go
// Go: Fan-in pattern
package main

import (
	"fmt"
	"math/rand"
	"sync"
	"time"
)

func fanIn(channels ...<-chan string) <-chan string {
	output := make(chan string)
	var wg sync.WaitGroup
	
	// Start a goroutine for each input channel
	for _, ch := range channels {
		wg.Add(1)
		go func(input <-chan string) {
			defer wg.Done()
			for value := range input {
				output <- value
			}
		}(ch)
	}
	
	// Close output when all input channels are closed
	go func() {
		wg.Wait()
		close(output)
	}()
	
	return output
}

func producer(id int) <-chan string {
	output := make(chan string)
	
	go func() {
		defer close(output)
		for i := 0; i < 3; i++ {
			time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
			output <- fmt.Sprintf("Producer %d: Item %d", id, i)
		}
	}()
	
	return output
}

func main() {
	// Create multiple producers
	producer1 := producer(1)
	producer2 := producer(2)
	producer3 := producer(3)
	
	// Fan-in all producers
	combined := fanIn(producer1, producer2, producer3)
	
	// Collect all results
	var results []string
	for result := range combined {
		results = append(results, result)
		fmt.Println("Received:", result)
	}
	
	fmt.Println("All results collected:", len(results))
}
```
</UniversalEditor>

## Channel Composition

### Pipeline with Error Handling

Building robust pipelines with error handling:

<UniversalEditor title="Pipeline with Error Handling" compare={true}>
```javascript !! js
// JavaScript: Pipeline with error handling
class Pipeline {
  constructor() {
    this.stages = [];
  }
  
  addStage(stage) {
    this.stages.push(stage);
    return this;
  }
  
  async process(input) {
    let current = input;
    
    for (const stage of this.stages) {
      try {
        current = await stage(current);
      } catch (error) {
        console.error('Pipeline stage failed:', error);
        throw error;
      }
    }
    
    return current;
  }
}

// Usage
const pipeline = new Pipeline()
  .addStage(async (data) => {
    if (data.some(x => x < 0)) {
      throw new Error('Negative numbers not allowed');
    }
    return data.map(x => x * 2);
  })
  .addStage(async (data) => {
    return data.filter(x => x > 5);
  })
  .addStage(async (data) => {
    return data.reduce((sum, x) => sum + x, 0);
  });

// Test with valid data
pipeline.process([1, 2, 3, 4, 5])
  .then(result => console.log('Result:', result))
  .catch(error => console.log('Error:', error.message));

// Test with invalid data
pipeline.process([1, -2, 3, 4, 5])
  .then(result => console.log('Result:', result))
  .catch(error => console.log('Error:', error.message));
```

```go !! go
// Go: Pipeline with error handling
package main

import (
	"fmt"
	"time"
)

type PipelineStage func(<-chan int) (<-chan int, <-chan error)

func stage1(input <-chan int) (<-chan int, <-chan error) {
	output := make(chan int)
	errors := make(chan error)
	
	go func() {
		defer close(output)
		defer close(errors)
		
		for value := range input {
			if value < 0 {
				errors <- fmt.Errorf("negative number not allowed: %d", value)
				continue
			}
			output <- value * 2
		}
	}()
	
	return output, errors
}

func stage2(input <-chan int) (<-chan int, <-chan error) {
	output := make(chan int)
	errors := make(chan error)
	
	go func() {
		defer close(output)
		defer close(errors)
		
		for value := range input {
			if value > 5 {
				output <- value
			}
		}
	}()
	
	return output, errors
}

func stage3(input <-chan int) (<-chan int, <-chan error) {
	output := make(chan int)
	errors := make(chan error)
	
	go func() {
		defer close(output)
		defer close(errors)
		
		sum := 0
		count := 0
		for value := range input {
			sum += value
			count++
		}
		
		if count > 0 {
			output <- sum
		}
	}()
	
	return output, errors
}

func main() {
	// Create input channel
	input := make(chan int)
	
	// Build pipeline
	stage1Out, stage1Err := stage1(input)
	stage2Out, stage2Err := stage2(stage1Out)
	stage3Out, stage3Err := stage3(stage2Out)
	
	// Send test data
	go func() {
		defer close(input)
		testData := []int{1, 2, 3, 4, 5}
		for _, value := range testData {
			input <- value
		}
	}()
	
	// Collect results and errors
	var results []int
	var errors []error
	
	// Collect from final stage
	for result := range stage3Out {
		results = append(results, result)
	}
	
	// Collect errors from all stages
	for err := range stage1Err {
		errors = append(errors, err)
	}
	for err := range stage2Err {
		errors = append(errors, err)
	}
	for err := range stage3Err {
		errors = append(errors, err)
	}
	
	fmt.Println("Results:", results)
	if len(errors) > 0 {
		fmt.Println("Errors:", errors)
	}
}
```
</UniversalEditor>

## Context and Cancellation

Using context with channels for cancellation:

<UniversalEditor title="Context and Cancellation" compare={true}>
```javascript !! js
// JavaScript: AbortController for cancellation
class CancellablePipeline {
  constructor() {
    this.controller = new AbortController();
  }
  
  async process(data, signal) {
    const results = [];
    
    for (const item of data) {
      // Check if cancelled
      if (signal.aborted) {
        throw new Error('Pipeline cancelled');
      }
      
      // Simulate work
      await new Promise(resolve => setTimeout(resolve, 100));
      results.push(item * 2);
    }
    
    return results;
  }
  
  cancel() {
    this.controller.abort();
  }
}

// Usage
const pipeline = new CancellablePipeline();

// Start processing
const processPromise = pipeline.process([1, 2, 3, 4, 5], pipeline.controller.signal);

// Cancel after 200ms
setTimeout(() => {
  pipeline.cancel();
}, 200);

processPromise
  .then(results => console.log('Results:', results))
  .catch(error => console.log('Error:', error.message));
```

```go !! go
// Go: Context with channels for cancellation
package main

import (
	"context"
	"fmt"
	"time"
)

func processWithContext(ctx context.Context, input <-chan int) <-chan int {
	output := make(chan int)
	
	go func() {
		defer close(output)
		
		for {
			select {
			case value := <-input:
				// Simulate work
				time.Sleep(100 * time.Millisecond)
				output <- value * 2
			case <-ctx.Done():
				fmt.Println("Processing cancelled:", ctx.Err())
				return
			}
		}
	}()
	
	return output
}

func main() {
	// Create context with timeout
	ctx, cancel := context.WithTimeout(context.Background(), 300*time.Millisecond)
	defer cancel()
	
	// Create input channel
	input := make(chan int)
	
	// Start processing
	output := processWithContext(ctx, input)
	
	// Send data
	go func() {
		defer close(input)
		for i := 1; i <= 10; i++ {
			select {
			case input <- i:
				fmt.Printf("Sent: %d\n", i)
			case <-ctx.Done():
				fmt.Println("Sending cancelled")
				return
			}
		}
	}()
	
	// Collect results
	var results []int
	for {
		select {
		case result := <-output:
			results = append(results, result)
			fmt.Printf("Received: %d\n", result)
		case <-ctx.Done():
			fmt.Println("Collection cancelled")
			goto done
		}
	}
	
done:
	fmt.Printf("Collected %d results\n", len(results))
}
```
</UniversalEditor>

## Channel Best Practices

### 1. Always Close Channels from the Sender

<UniversalEditor title="Channel Closing Best Practices" compare={true}>
```javascript !! js
// JavaScript: Resource cleanup patterns
class ResourceManager {
  constructor() {
    this.resources = new Set();
    this.closed = false;
  }
  
  addResource(resource) {
    if (this.closed) {
      throw new Error('Manager is closed');
    }
    this.resources.add(resource);
  }
  
  close() {
    if (this.closed) return;
    
    this.closed = true;
    // Clean up all resources
    for (const resource of this.resources) {
      resource.cleanup();
    }
    this.resources.clear();
  }
  
  isClosed() {
    return this.closed;
  }
}
```

```go !! go
// Go: Channel closing best practices
package main

import (
	"fmt"
	"sync"
)

type SafeChannel struct {
	ch     chan string
	closed bool
	mutex  sync.RWMutex
}

func NewSafeChannel(bufferSize int) *SafeChannel {
	return &SafeChannel{
		ch: make(chan string, bufferSize),
	}
}

func (sc *SafeChannel) Send(value string) error {
	sc.mutex.RLock()
	defer sc.mutex.RUnlock()
	
	if sc.closed {
		return fmt.Errorf("channel is closed")
	}
	
	sc.ch <- value
	return nil
}

func (sc *SafeChannel) Receive() (string, bool) {
	value, ok := <-sc.ch
	return value, ok
}

func (sc *SafeChannel) Close() {
	sc.mutex.Lock()
	defer sc.mutex.Unlock()
	
	if !sc.closed {
		sc.closed = true
		close(sc.ch)
	}
}

func (sc *SafeChannel) IsClosed() bool {
	sc.mutex.RLock()
	defer sc.mutex.RUnlock()
	return sc.closed
}

func main() {
	ch := NewSafeChannel(5)
	
	// Send some data
	ch.Send("Hello")
	ch.Send("World")
	
	// Close the channel
	ch.Close()
	
	// Try to send after closing
	if err := ch.Send("Test"); err != nil {
		fmt.Println("Error:", err)
	}
	
	// Receive remaining data
	for {
		if value, ok := ch.Receive(); ok {
			fmt.Println("Received:", value)
		} else {
			break
		}
	}
}
```
</UniversalEditor>

### 2. Avoid Channel Leaks

<UniversalEditor title="Preventing Channel Leaks" compare={true}>
```javascript !! js
// JavaScript: Preventing resource leaks
class ChannelManager {
  constructor() {
    this.channels = new Set();
  }
  
  createChannel() {
    const channel = new EventChannel();
    this.channels.add(channel);
    return channel;
  }
  
  closeAll() {
    for (const channel of this.channels) {
      channel.close();
    }
    this.channels.clear();
  }
  
  cleanup() {
    this.closeAll();
  }
}
```

```go !! go
// Go: Preventing channel leaks
package main

import (
	"fmt"
	"sync"
	"time"
)

type ChannelManager struct {
	channels map[string]chan string
	mutex    sync.RWMutex
}

func NewChannelManager() *ChannelManager {
	return &ChannelManager{
		channels: make(map[string]chan string),
	}
}

func (cm *ChannelManager) CreateChannel(id string) chan string {
	cm.mutex.Lock()
	defer cm.mutex.Unlock()
	
	ch := make(chan string, 10)
	cm.channels[id] = ch
	return ch
}

func (cm *ChannelManager) CloseChannel(id string) {
	cm.mutex.Lock()
	defer cm.mutex.Unlock()
	
	if ch, exists := cm.channels[id]; exists {
		close(ch)
		delete(cm.channels, id)
	}
}

func (cm *ChannelManager) CloseAll() {
	cm.mutex.Lock()
	defer cm.mutex.Unlock()
	
	for id, ch := range cm.channels {
		close(ch)
		delete(cm.channels, id)
	}
}

func (cm *ChannelManager) GetChannelCount() int {
	cm.mutex.RLock()
	defer cm.mutex.RUnlock()
	return len(cm.channels)
}

func main() {
	manager := NewChannelManager()
	
	// Create channels
	ch1 := manager.CreateChannel("worker1")
	ch2 := manager.CreateChannel("worker2")
	
	fmt.Printf("Active channels: %d\n", manager.GetChannelCount())
	
	// Use channels
	go func() {
		ch1 <- "Hello from worker1"
		time.Sleep(100 * time.Millisecond)
		manager.CloseChannel("worker1")
	}()
	
	go func() {
		ch2 <- "Hello from worker2"
		time.Sleep(100 * time.Millisecond)
		manager.CloseChannel("worker2")
	}()
	
	// Receive messages
	msg1 := <-ch1
	msg2 := <-ch2
	
	fmt.Println("Received:", msg1, msg2)
	fmt.Printf("Active channels: %d\n", manager.GetChannelCount())
	
	// Clean up
	manager.CloseAll()
	fmt.Printf("Active channels after cleanup: %d\n", manager.GetChannelCount())
}
```
</UniversalEditor>

---

### Practice Questions:
1. What is the difference between a send-only and receive-only channel?
2. How does the `default` case in `select` enable non-blocking operations?
3. Explain the fan-out and fan-in patterns and when to use each.
4. How can you implement timeout functionality using channels and `select`?
5. What are the best practices for closing channels and preventing leaks?

### Project Idea:
Create a load balancer that uses channels to distribute incoming requests across multiple worker goroutines. The load balancer should implement health checks, graceful shutdown, and request timeout handling using the patterns learned in this module.
